【新機能】Kinesis Analyticsが利用可能になりました!
ウィスキー、シガー、パイプをこよなく愛する大栗です。
8/10〜11に開催していたのAWS Summit New York 2016のKeynoteの中で、Kinesis Analyticsが利用可能になったという発表がありましたので試してみました。
Kinesis Analytics
Kinesis Analyticsはre:Invent 2015で発表されたストリームデータ処理サービスです。当時は来年に使用可能になるとの発表のみで詳細が不明でしたが、ようやく利用可能になりました。
Kinesis Analyticsはストリームデータに対してSQLで処理を行う機能です。Kinesis Stream + Lambdaでもストリーム処理は可能ですが、基本的には1レコード単位か1回に取り出したデータしかまとめて処理ができませんでした。Kinesis Analyticsでは、データの変換を行ったり、タイムウィンドウの操作も可能になっています。例えば1分毎の合計値を出したり1、過去1分間の最大値を取得する2こともできます。
なお、2016年8月12日現在で利用可能なリージョンは、以下の通りとなっています。
- アイルランド
- 北部バージニア
- オレゴン
Kinesis Analyticsを試す
ドキュメントのGetting Started Exerciseの内容を元に試してみます。
Getting Started Exerciseは以下の流れとなります。
- アプリケーションを作成する
- 入力を設定する
- リアルタイム分析を追加する
- アプリケーションコードを更新する
- 出力を設定する
アプリケーションを作成する
Kinesis Analyticsのアプリケーションを作成します。
Kinesis Analyticsの画面でCreate new application
をクリックします。
アプリケーション名と説明を入力します。ここではAnalytics-01
とします。
入力を設定する
Connect to a source
をクリックします。
Configure a new stream
を選択して、Create demo stream
をクリックします。ここではデモ用データを使用するためCreate demo stream
を選択しています。入力で新規のKinesis Firehoseや新規のKinesis Streamを作成する場合は各々のボタンをクリックします。既存のStreamを使用する場合は不要ですので、ここを飛ばして下さい。
Configure a new stream
を選択するとデモ用Streamが作成されます。
デモ用Streamが作成されたら、Select a stream
を選択します。kinesis-analytics-demo-stream
を選択してSave and continue
をクリックします。既存のStreamを使用する場合は、ここで対象のStreamを選択して下さい。
なお、kinesis-analytics-demo-stream
のデータは以下のようなデータになっています。
{"TICKER_SYMBOL":"ALY","SECTOR":"ENERGY","CHANGE":1.49,"PRICE":19.74}, {"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY","CHANGE":-0.73,"PRICE":22.96}, {"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY","CHANGE":0.69,"PRICE":172.71}, {"TICKER_SYMBOL":"BAC","SECTOR":"FINANCIAL","CHANGE":-0.28,"PRICE":13.23}
リアルタイム分析を追加する
Go to SQL editor
をクリックします。
ポップアップが表示されるのでYes, start application
をクリックします。
Add SQL from templates
をクリックしてテンプレートからSQLを追加します。
Continuous filter
を選択してAdd this SQL to the editor
をクリックします。
SQLの中には以下の通りです。
このSQLはDESTINATION_SQL_STREAM
Streamを作成しています。
そしてSTREAM_PUMP
を作成して、SOURCE_SQL_STREAM_001
からSIMILARに"TECH"が含まれているデータについてticker_symbol, sector, change, priceをDESTINATION_SQL_STREAM
へ挿入しています。
-- ** Continuous Filter ** -- Performs a continuous filter based on a WHERE condition. -- .----------. .----------. .----------. -- | SOURCE | | INSERT | | DESTIN. | -- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination -- | | | (PUMP) | | | -- '----------' '----------' '----------' -- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE -- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM -- Create output stream, which can be used to send to a destination CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change REAL, price REAL); -- Create pump to insert into output CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" -- Select all columns from source stream SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" -- LIKE compares a string to a string pattern (_ matches all char, % matches substring) -- SIMILAR TO compares string to a regex, may use ESCAPE WHERE sector SIMILAR TO '%TECH%';
Reading from input stream
をクリックしてStreamデータを読み込みます。
Real-time analytics
タブを見るとDESTINATION_SQL_STREAM
が出力されます。
アプリケーションコードを更新する
次にアプリケーションコードを更新します。
以下のSQLを追加します。
これはDESTINATION_SQL_STREAM_2
Streamを作成しています。
そしてSTREAM_PUMP_2
を作成して、DESTINATION_SQL_STREAM
Streamからticker_symbol, change, priceをDESTINATION_SQL_STREAM_2
に挿入しています。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_2" (ticker_symbol VARCHAR(4), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS INSERT INTO "DESTINATION_SQL_STREAM_2" SELECT STREAM ticker_symbol, change, price FROM "DESTINATION_SQL_STREAM";
Save and run SQL
をクリックするとDESTINATION_SQL_STREAM_2
のデータが出力されます。
もししばらく待ってもデータが出力されない時には、Source data
タブでBegin populating stream with sample stock ticker data
をクリックして下さい。サンプルデータが流れ始めます。
さらにSQLを追加してみます。
出力用のStreamとしてAMZN_STREAM
とTGT_STREAM
を作成しています。
AMZN_PUMP
でSIMILARに"AMZN"が含まれるデータをAMZN_STREAM
へ挿入しています。
TGT_PUMP
でSIMILARに"TGT"が含まれるデータをTGT_STREAM
へ挿入しています。
CREATE OR REPLACE STREAM "AMZN_STREAM" (ticker_symbol VARCHAR(4), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "AMZN_PUMP" AS INSERT INTO "AMZN_STREAM" SELECT STREAM ticker_symbol, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ticker_symbol SIMILAR TO '%AMZN%'; CREATE OR REPLACE STREAM "TGT_STREAM" (ticker_symbol VARCHAR(4), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "TGT_PUMP" AS INSERT INTO "TGT_STREAM" SELECT STREAM ticker_symbol, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ticker_symbol SIMILAR TO '%TGT%';
Save and run SQL
をクリックするとAMZN_STREAM
とTGT_STREAM
のデータが出力されます。
出力を設定する
出力先を設定します。Connect to destination
をクリックします。
ここでは新規にKinesis Firehoseを作成してS3へ保存するためConfigure a new stream
を選択してGo to Kinesis Firehose
をクリックします。
既存のStreamを選択する場合はSelect a stream
を選択して対象のStreamを選んで下さい。
ここでKinesis Firehoseを構成しますが手順は割愛します。詳しくは、以下のエントリーを参照ください。
[新機能]Amazon Kinesis FirehoseでS3にデータを送ってみた #reinvent
Kinesis FirehoseでS3への出力を構成したら、Select a stream
を選択して作成したKinesis Firehoseを選択します。ここでは出力対象のStreamはDESTINATION_SQL_STREAM
としてSave and continue
をクリックします。
これでKinesis Analyticsの設定は完了です。
出力結果を確認する
Kinesis Firehoseで出力したS3のデータを確認します。
以下の様にS3へファイルが出力されます。
ファイルの内容を確認してみます。出力形式にJSONを指定したのでJSONで出力されています。
{"TICKER_SYMBOL":"CVB","SECTOR":"TECHNOLOGY","CHANGE":"-0.23","PRICE":"60.99"} {"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.02","PRICE":"10.56"} {"TICKER_SYMBOL":"AAPL","SECTOR":"TECHNOLOGY","CHANGE":"-0.02","PRICE":"122.87"} {"TICKER_SYMBOL":"IOP","SECTOR":"TECHNOLOGY","CHANGE":"2.43","PRICE":"151.47"} {"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.13","PRICE":"10.43"} {"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"-9.71","PRICE":"601.34"} {"TICKER_SYMBOL":"DFG","SECTOR":"TECHNOLOGY","CHANGE":"2.34","PRICE":"144.8"} {"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"0.23","PRICE":"601.57"} {"TICKER_SYMBOL":"HJK","SECTOR":"TECHNOLOGY","CHANGE":"0.09","PRICE":"5.85"} {"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY","CHANGE":"1.19","PRICE":"182.6"} {"TICKER_SYMBOL":"AAPL","SECTOR":"TECHNOLOGY","CHANGE":"1.54","PRICE":"124.41"} {"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"-0.2","PRICE":"10.23"} {"TICKER_SYMBOL":"JKL","SECTOR":"TECHNOLOGY","CHANGE":"0.08","PRICE":"10.31"} {"TICKER_SYMBOL":"AMZN","SECTOR":"TECHNOLOGY","CHANGE":"0.16","PRICE":"601.73"} {"TICKER_SYMBOL":"HJK","SECTOR":"TECHNOLOGY","CHANGE":"0.1","PRICE":"5.95"}
さいごに
Kinesis Firehoseで柔軟なストリーム処理が可能になります。特にSQLという一般的なデータ処理言語を採用しているので導入のハードルが低いのではないかと思います。処理したデータをKinesis Stream経由でElasticsearch Serviceへ出力することもコーディングなしでできるので、リアルタイムダッシュボードも簡単に実現できます。
ストリームデータは処理が面倒なものですが、随分簡単に処理できるようになったみたいです。